package com.zyf.apitest.state;

import com.zyf.apitest.beans.SensorReading;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import scala.Tuple3;

/**
 * @author Malegod_xiaofei
 * @create 2022-01-17-22:51
 */
public class StateTest4_FaultTolerance {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(1);

        // 状态后端配置
        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new FsStateBackend("hdfs路径"));
        env.setStateBackend(new RocksDBStateBackend("hdfs路径"));

        // socket 文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成 SensorReading 类型，分配时间戳和 watermark
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fileds = line.split(",");
            return new SensorReading(fileds[0], new Long(fileds[1]), new Double(fileds[2]));
        });

        dataStream.print();

        // 执行
        env.execute();
    }
}
